summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README30
-rw-r--r--glucometerutils/drivers/sdcodefree.py165
-rw-r--r--setup.py2
3 files changed, 94 insertions, 103 deletions
diff --git a/README b/README
index 1f42487..a5dcd32 100644
--- a/README
+++ b/README
@@ -32,21 +32,21 @@ $ . glucometerutils-venv/bin/activate
Please see the following table for the driver for each device that is known and
supported.
-| Manufacturer | Model Name | Driver | Dependencies |
-| --- | --- | --- | --- |
-| LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] |
-| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [pyserial] |
-| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [pyserial] |
-| LifeScan | OneTouch Verio (USB) | `otverio2015` | [python-scsi] |
-| LifeScan | OneTouch Select Plus | `otverio2015` | [python-scsi] |
-| Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ |
-| Abbott | FreeStyle Libre | `fslibre` | [construct] [hidapi]‡ |
-| Abbott | FreeStyle Optium | `fsoptium` | [pyserial] |
-| Abbott | FreeStyle Precision Neo | `fsprecisionneo` | [construct] [hidapi]‡ |
-| Abbott | FreeStyle Optium Neo | `fsprecisionneo` | [construct] [hidapi]‡ |
-| Abbott | FreeStyle Optium Neo H | `fsprecisionneo` | [construct] [hidapi]‡ |
-| Roche | Accu-Chek Mobile | `accuchek_reports` | |
-| SD Biosensor | SD CodeFree | `sdcodefree` | [pyserial] |
+| Manufacturer | Model Name | Driver | Dependencies |
+| --- | --- | --- | --- |
+| LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] |
+| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [pyserial] |
+| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [pyserial] |
+| LifeScan | OneTouch Verio (USB) | `otverio2015` | [python-scsi] |
+| LifeScan | OneTouch Select Plus | `otverio2015` | [python-scsi] |
+| Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ |
+| Abbott | FreeStyle Libre | `fslibre` | [construct] [hidapi]‡ |
+| Abbott | FreeStyle Optium | `fsoptium` | [pyserial] |
+| Abbott | FreeStyle Precision Neo | `fsprecisionneo` | [construct] [hidapi]‡ |
+| Abbott | FreeStyle Optium Neo | `fsprecisionneo` | [construct] [hidapi]‡ |
+| Abbott | FreeStyle Optium Neo H | `fsprecisionneo` | [construct] [hidapi]‡ |
+| Roche | Accu-Chek Mobile | `accuchek_reports` | |
+| SD Biosensor | SD CodeFree | `sdcodefree` | [construct] [pyserial] |
† Untested.
‡ Optional dependency on Linux; required on other operating systems.
diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py
index 4a375bd..2d145cc 100644
--- a/glucometerutils/drivers/sdcodefree.py
+++ b/glucometerutils/drivers/sdcodefree.py
@@ -19,96 +19,83 @@ __email__ = 'flameeyes@flameeyes.eu'
__copyright__ = 'Copyright © 2017, Diego Elio Pettenò'
__license__ = 'MIT'
-import array
-import collections
+import binascii
import datetime
+import enum
import functools
import logging
import operator
-import struct
-import time
+
+import construct
from glucometerutils import common
from glucometerutils import exceptions
from glucometerutils.support import serial
-_STX = 0x53 # Not really 'STX'
-_ETX = 0xAA # Not really 'ETX'
-
-_DIR_IN = 0x20
-_DIR_OUT = 0x10
-
-_IDX_STX = 0
-_IDX_DIRECTION = 1
-_IDX_LENGTH = 2
-_IDX_CHECKSUM = -2
-_IDX_ETX = -1
+def xor_checksum(msg):
+ return functools.reduce(operator.xor, msg)
-_RECV_PREAMBLE = b'\x53\x20'
+class Direction(enum.Enum):
+ In = 0x20
+ Out = 0x10
+
+_PACKET = construct.Struct(
+ 'stx' / construct.Const(construct.Byte, 0x53),
+ 'direction' / construct.SymmetricMapping(
+ construct.Byte,
+ {e: e.value for e in Direction}),
+ 'length' / construct.Rebuild(
+ construct.Byte, lambda ctx: len(ctx.message) + 2),
+ 'message' / construct.Bytes(length=lambda ctx: ctx.length - 2),
+ 'checksum' / construct.Checksum(
+ construct.Byte, xor_checksum, construct.this.message),
+ 'etx' / construct.Const(construct.Byte, 0xAA)
+)
+
+_FIRST_MESSAGE = construct.Struct(
+ construct.Const(construct.Byte, 0x30),
+ 'count' / construct.Int16ub,
+ construct.Const(construct.Byte, 0xAA)[19])
_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA'
-_RESPONSE_PACKET = b'\x10\x40'
-
-_DATE_SET_PACKET = b'\x10\x10'
+_RESPONSE_MESSAGE = b'\x10\x40'
-_DISCONNECT_PACKET = b'\x10\x60'
-_DISCONNECTED_PACKET = b'\x10\x70'
+_DATE_SET_MESSAGE = b'\x10\x10'
-_STRUCT_READINGS_COUNT = struct.Struct('>H')
+_DISCONNECT_MESSAGE = b'\x10\x60'
+_DISCONNECTED_MESSAGE = b'\x10\x70'
-_FETCH_PACKET = b'\x10\x60'
-
-_ReadingRecord = collections.namedtuple(
- '_ReadingRecord',
- ('unknown1', 'unknown2', 'year', 'month', 'day', 'hour', 'minute',
- 'value', 'meal_flag'))
-_STRUCT_READING = struct.Struct('>BBBBBBBHB')
+_FETCH_MESSAGE = b'\x10\x60'
_MEAL_FLAG = {
- 0x00: common.Meal.NONE,
- 0x10: common.Meal.BEFORE,
- 0x20: common.Meal.AFTER,
+ common.Meal.NONE: 0x00,
+ common.Meal.BEFORE: 0x10,
+ common.Meal.AFTER: 0x20,
}
-def parse_reading(msgdata):
- return _ReadingRecord(*_STRUCT_READING.unpack_from(msgdata))
+_READING = construct.Struct(
+ construct.Byte[2],
+ 'year' / construct.Byte,
+ 'month' / construct.Byte,
+ 'day' / construct.Byte,
+ 'hour' / construct.Byte,
+ 'minute' / construct.Byte,
+ 'value' / construct.Int16ub,
+ 'meal' / construct.SymmetricMapping(
+ construct.Byte, _MEAL_FLAG),
+ construct.Byte[7],
+)
-def xor_checksum(msg):
- return functools.reduce(operator.xor, msg)
class Device(serial.SerialDevice):
BAUDRATE = 38400
DEFAULT_CABLE_ID = '10c4:ea60' # Generic cable.
TIMEOUT = 300 # We need to wait for data from the device.
- def read_packet(self):
- preamble = self.serial_.read(3)
- if len(preamble) != 3:
- raise exceptione.InvalidResponse(
- response='Expected 3 bytes, received %d' % len(preamble))
- if preamble[0:_IDX_LENGTH] != _RECV_PREAMBLE:
- raise exceptions.InvalidResponse(
- response='Unexpected preamble %r' % pramble[0:_IDX_LENGTH])
-
- msglen = preamble[_IDX_LENGTH]
- message = self.serial_.read(msglen)
- if len(message) != msglen:
- raise exception.InvalidResponse(
- response='Expected %d bytes, received %d' %
- (msglen, len(message)))
- if message[_IDX_ETX] != _ETX:
- raise exception.InvalidResponse(
- response='Unexpected end-of-transmission byte: %02x' %
- message[_IDX_ETX])
-
- # Calculate the checksum up until before the checksum itself.
- msgdata = message[:_IDX_CHECKSUM]
-
- cksum = xor_checksum(msgdata)
- if cksum != message[_IDX_CHECKSUM]:
- raise exception.InvalidChecksum(message[_IDX_CHECKSUM], cksum)
-
- return msgdata
+ def read_message(self):
+ pkt = _PACKET.parse_stream(self.serial_)
+ logging.debug('received packet: %r', pkt)
+ return pkt.message
def wait_and_ready(self):
challenge = self.serial_.read(1)
@@ -116,6 +103,7 @@ class Device(serial.SerialDevice):
# The first packet read may have a prefixed zero, it might be a bug in
# the cp210x driver or device, but discard it if found.
if challenge == b'\0':
+ logging.debug('spurious null byte received')
challege = self.serial_.read(1)
if challenge != b'\x53':
raise exceptions.ConnectionFailed(
@@ -127,30 +115,33 @@ class Device(serial.SerialDevice):
raise exceptions.ConnectionFailed(
message='Unexpected challenge %r' % challenge)
- self.send_packet(_RESPONSE_PACKET)
+ logging.debug(
+ 'challenge packet received: %s', binascii.hexlify(challenge))
+
+ self.send_message(_RESPONSE_MESSAGE)
# The first packet only contains the counter of how many readings are
# available.
- first_packet = self.read_packet()
-
- count = _STRUCT_READINGS_COUNT.unpack_from(first_packet, 1)
+ first_message = _FIRST_MESSAGE.parse(self.read_message())
+ logging.debug('received first message: %r', first_message)
- return count[0]
+ return first_message.count
- def send_packet(self, msgdata):
- packet = array.array('B')
- packet.extend((_STX, _DIR_OUT, len(msgdata)+2))
- packet.extend(msgdata)
- packet.extend((xor_checksum(msgdata), _ETX))
- self.serial_.write(packet.tobytes())
+ def send_message(self, message):
+ pkt = _PACKET.build({
+ 'message': message,
+ 'direction': Direction.Out
+ })
+ logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ self.serial_.write(pkt)
def connect(self):
print("Please connect and turn on the device.")
def disconnect(self):
- self.send_packet(_DISCONNECT_PACKET)
- response = self.read_packet()
- if response != _DISCONNECTED_PACKET:
+ self.send_message(_DISCONNECT_MESSAGE)
+ response = self.read_message()
+ if response != _DISCONNECTED_MESSAGE:
raise exceptions.InvalidResponse(response=response)
def get_meter_info(self):
@@ -175,9 +166,9 @@ class Device(serial.SerialDevice):
# Ignore the readings count.
self.wait_and_ready()
- self.send_packet(setdatecmd)
- response = self.read_packet()
- if response != _DATE_SET_PACKET:
+ self.send_message(setdatecmd)
+ response = self.read_message()
+ if response != _DATE_SET_MESSAGE:
raise exceptions.InvalidResponse(response=response)
# The date we return should only include up to minute, unfortunately.
@@ -185,19 +176,19 @@ class Device(serial.SerialDevice):
date.hour, date.minute)
def zero_log(self):
- raise NotmplementedError
+ raise NotImplementedError
def get_readings(self):
count = self.wait_and_ready()
for _ in range(count):
- self.send_packet(_FETCH_PACKET)
- rpkt = self.read_packet()
+ self.send_message(_FETCH_MESSAGE)
+ message = self.read_message()
- r = parse_reading(rpkt)
- meal = _MEAL_FLAG[r.meal_flag]
+ r = _READING.parse(message)
+ logging.debug('received reading: %r', r)
yield common.GlucoseReading(
datetime.datetime(
2000 + r.year, r.month, r.day, r.hour, r.minute),
- r.value, meal=meal)
+ r.value, meal=r.meal)
diff --git a/setup.py b/setup.py
index c49745a..a9b7c18 100644
--- a/setup.py
+++ b/setup.py
@@ -56,7 +56,7 @@ setup(
'fsoptium': ['pyserial'],
'fsprecisionneo': ['construct', 'hidapi'],
'accucheck_reports': [],
- 'sdcodefree': ['pyserial'],
+ 'sdcodefree': ['construct', 'pyserial'],
},
entry_points = {
'console_scripts': [